package com.song.sql;

import com.song.sql.entity.CityDetailAgg;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.udaf;


/**
 * udaf
 *
 */
public class TestHiveloadSql002 {
    public static void main(String[] args) {

        // TODO 在编码前，设定Hadoop的访问用户
        System.setProperty("HADOOP_USER_NAME","SongXianYang");

        final SparkSession sparkSession = SparkSession
                .builder()
                .enableHiveSupport() // TODO 启用Hive的支持
                .master("local[*]")
                .appName("SparkSQL")
                .getOrCreate();

        sparkSession.udf().register("cityDetail",udaf(new CityDetailAgg(), Encoders.STRING()));

        sparkSession.sql("use spark_user_visit_action");


        String sql="select * from  (select *,\n" +
                "       rank() over ( partition by area order by click_count desc ) rk\n" +
                "from (select c.area, p.product_name, count(p.product_id) click_count,cityDetail(c.city_name)\n" +
                "      from user_visit_action a\n" +
                "               join city_info c on a.city_id = c.city_id\n" +
                "               join product_info p on p.product_id = a.click_product_id\n" +
                "      where click_category_id != -1\n" +
                "      group by c.area, p.product_name\n" +
                "    ))t1 where rk <= 3";


        sparkSession.sql(sql).show();
        // TODO 释放资源
        sparkSession.close();

    }
}
